Benchmark

Because of dependant TensorFlow version incompatibility, two set of benchmarks are executed.

1 Benchmark 1

The first benchmark compares OpenAI/Baselines, Ray/RLlib, Chainer/ChainerRL, and cpprb.

1.1 Settings

We use following docker image to take benchmarks;

FROM python:3.7

RUN apt update \
	&& apt install -y --no-install-recommends \
	libopenmpi-dev libgl1-mesa-dev zlib1g-dev \
	&& apt clean \
	&& rm -rf /var/lib/apt/lists/* \
	&& pip install tensorflow==1.14 \
	&& pip install gym \
	&& pip install pandas ray[rllib] chainerrl perfplot \
	&& git clone https://github.com/openai/baselines.git \
	&& pip install ./baselines \
	&& rm -rf baselines


# OpenAI Baselines requires TensorFlow 1.14
# OpenAI Baselines at PyPI seems to be obsolete.
# gym (from cpprb) requires certain version of cloudpickle (install earlier)
# RLlib silently requires Pandas

CMD ["bash"]
  • OpenAI Baselines requires TensorFlow 1.14
  • OpenAI Baselines at PyPI seems to be obsolete and requires non-free MuJoCo.
  • RLlib requres Pandas, too.

The benchmark script is as follows;

import matplotlib.pyplot as plt
import numpy as np
import perfplot
import gc

# OpenAI/Baselines: https://github.com/openai/baselines
# Requires TensorFlow 1.14 instead of 2
from baselines.deepq.replay_buffer import (ReplayBuffer as bRB,
                                           PrioritizedReplayBuffer as bPRB)

# Ray/RLlib: https://github.com/ray-project/ray
# Requires Pandas, even though wich is not in `install_requires`
from ray.rllib.execution.replay_buffer import (ReplayBuffer as rRB,
                                               PrioritizedReplayBuffer as rPRB)
from ray.rllib.policy.sample_batch import SampleBatch

# Chainer/ChainerRL: https://github.com/chainer/chainerrl
from chainerrl.replay_buffers import (ReplayBuffer as cRB,
                                      PrioritizedReplayBuffer as cPRB)

from cpprb import (ReplayBuffer as RB,
                   PrioritizedReplayBuffer as PRB)


# Configulation
buffer_size = 2**12

obs_shape = 15
act_shape = 3

alpha = 0.4
beta  = 0.4

env_dict = {"obs": {"shape": obs_shape},
            "act": {"shape": act_shape},
            "next_obs": {"shape": obs_shape},
            "rew": {},
            "done": {}}


# Initialize Replay Buffer
brb = bRB(buffer_size)
rrb = rRB(buffer_size)
rrb._num_sampled = 0 # Fix: https://github.com/ray-project/ray/issues/14818

crb = cRB(buffer_size)
rb  =  RB(buffer_size,env_dict)


# Initialize Prioritized Replay Buffer
bprb = bPRB(buffer_size,alpha=alpha)
rprb = rPRB(buffer_size,alpha=alpha)
cprb = cPRB(buffer_size,alpha=alpha,beta0=beta,betasteps=None)
prb  =  PRB(buffer_size,env_dict,alpha=alpha)



# Helper Function
def env(n):
    e = {"obs": np.ones((n,obs_shape)),
         "act": np.zeros((n,act_shape)),
         "next_obs": np.ones((n,obs_shape)),
         "rew": np.zeros(n),
         "done": np.zeros(n)}
    return e

def add_b(_rb):
    """ Add for Baselines
    """
    def add(e):
        for i in range(e["obs"].shape[0]):
            _rb.add(obs_t=e["obs"][i],
                    action=e["act"][i],
                    reward=e["rew"][i],
                    obs_tp1=e["next_obs"][i],
                    done=e["done"][i])
    return add

def add_r(_rb,with_priority=False):
    """ Add for RLlib

    Notes
    -----
    Even `ReplayBuffer` requires `weight` parameter (but don't use it).
    """
    def add(e):
        for i in range(e["obs"].shape[0]):
            _rb.add(SampleBatch(obs_t=[e["obs"][i]],
                                action=[e["act"][i]],
                                reward=[e["rew"][i]],
                                obs_tp1=[e["next_obs"][i]],
                                done=[e["done"][i]]),
                    weight=0.5)

    def add_with_p(e):
        for i in range(e["obs"].shape[0]):
            _rb.add(SampleBatch(obs_t=[e["obs"][i]],
                                action=[e["act"][i]],
                                reward=[e["rew"][i]],
                                obs_tp1=[e["next_obs"][i]],
                                done=[e["done"][i]]),
                    weight=e["priority"][i])

    if with_priority:
        return add_with_p
    else:
        return add

def add_c(_rb):
    """ Add for ChainerRL
    """
    def add(e):
        for i in range(e["obs"].shape[0]):
            _rb.append(state=e["obs"][i],
                       action=e["act"][i],
                       reward=e["rew"][i],
                       next_state=e["next_obs"][i],
                       is_state_terminal=e["done"][i])
    return add

def sample_c(_rb):
    """ Force sample from ChainerRL PrioritizedReplayBuffer
    """
    def sample(n):
        _rb.memory.wait_priority_after_sampling = False
        return _rb.sample(n)

    return sample


# ReplayBuffer.add
perfplot.plot(setup = env,
              time_unit="ms",
              kernels = [add_b(brb),
                         add_r(rrb),
                         add_c(crb),
                         lambda e: rb.add(**e)],
              labels = ["OpenAI/Baselines","Ray/RLlib","Chainer/ChainerRL","cpprb"],
              n_range = [n for n in range(1,102,10)],
              xlabel = "Step size added at once",
              logx = False,
              logy = False,
              equality_check = None)
plt.title("Replay Buffer Add Speed")
plt.savefig("ReplayBuffer_add.png",
            transparent=True,
            bbox_inches="tight")
plt.close()

# Fill Buffers
o = np.random.rand(buffer_size,obs_shape)
e = {"obs": o, # [0,1)
     "act": np.random.rand(buffer_size,act_shape),
     "rew": np.random.rand(buffer_size),
     "next_obs": o,
     "done": np.random.randint(2,size=buffer_size)} # [0,2) == 0 or 1

add_b(brb)(e)
add_r(rrb)(e)
add_c(crb)(e)
rb.add(**e)

# ReplayBuffer.sample
perfplot.plot(setup = lambda n: n,
              time_unit="ms",
              kernels = [brb.sample,
                         rrb.sample,
                         crb.sample,
                         rb.sample],
              labels = ["OpenAI/Baselines",
                        "Ray/RLlib",
                        "Chainer/ChainerRL",
                        "cpprb"],
              n_range = [2**n for n in range(1,8)],
              xlabel = "Batch size",
              logx = False,
              logy = False,
              equality_check=None)
plt.title("Replay Buffer Sample Speed")
plt.savefig("ReplayBuffer_sample.png",
            transparent=True,
            bbox_inches="tight")
plt.close()

# PrioritizedReplayBuffer.add
perfplot.plot(time_unit="ms",
              setup = env,
              kernels = [add_b(bprb),
                         add_r(rprb),
                         add_c(cprb),
                         lambda e: prb.add(**e)],
              labels = ["OpenAI/Baselines",
                        "Ray/RLlib",
                        "Chainer/ChainerRL",
                        "cpprb"],
              n_range = [n for n in range(1,102,10)],
              xlabel = "Step size added at once",
              logx = False,
              logy = False,
              equality_check=None)
plt.title("Prioritized Replay Buffer Add Speed")
plt.savefig("PrioritizedReplayBuffer_add.png",
            transparent=True,
            bbox_inches="tight")
plt.close()


# Fill Buffers
o = np.random.rand(buffer_size,obs_shape)
p = np.random.rand(buffer_size)
e = {"obs": o, # [0,1)
     "act": np.random.rand(buffer_size,act_shape),
     "rew": np.random.rand(buffer_size),
     "next_obs": o,
     "done": np.random.randint(2,size=buffer_size)} # [0,2) == 0 or 1

# OpenAI/Baselines cannot set priority together.
add_b(bprb)(e)
bprb.update_priorities(np.arange(buffer_size,dtype=np.int),p)

e["priority"] = p

add_r(rprb,with_priority=True)(e)
prb.add(**e)

for i in range(buffer_size):
    o = e["obs"][i]
    a = e["act"][i]
    r = e["rew"][i]
    d = e["next_obs"][i]
    p = e["priority"][i]

    # Directly access internal PrioritizedBuffer,
    # since ChainerRL/PrioritizedReplayBuffer has no API to set priority.
    cprb.memory.append([{"state":o,
                        "action":a,
                        "reward":r,
                        "next_state":o,
                        "is_state_terminal":d}],
                       priority=p)


perfplot.plot(time_unit="ms",
              setup = lambda n: n,
              kernels = [lambda n: bprb.sample(n,beta=beta),
                         lambda n: rprb.sample(n,beta=beta),
                         sample_c(cprb),
                         lambda n: prb.sample(n,beta=beta)],
              labels = ["OpenAI/Baselines",
                        "Ray/RLlib",
                        "Chainer/ChainerRL",
                        "cpprb"],
              n_range = [2**n for n in range(1,9)],
              xlabel = "Batch size",
              logx=False,
              logy=False,
              equality_check=None)
plt.title("Prioritized Replay Buffer Sample Speed")
plt.savefig("PrioritizedReplayBuffer_sample.png",
            transparent=True,
            bbox_inches="tight")
plt.close()

1.2 Results

2 Benchmark 2

The second benchmark compares DeepMind/Reverb and cpprb. Reverb has multiple ways of adding and sampling.

2.1 Settings

We use following docker image to take benchmarks;

FROM python:3.7

RUN apt update \
	&& apt install -y --no-install-recommends libopenmpi-dev zlib1g-dev \
	&& apt clean \
	&& rm -rf /var/lib/apt/lists/* \
	&& pip install tf-nightly==2.3.0.dev20200604 dm-reverb-nightly==0.1.0.dev20200616 perfplot


# Reverb requires development version TensorFlow

CMD ["bash"]
  • DeepMind/Reverb requires development version TensorFlow 2.3.0.dev20200604

The benchmark script is as follows;

import gc
import itertools

import matplotlib.pyplot as plt
import numpy as np
import perfplot
import tensorflow as tf

# DeepMind/Reverb: https://github.com/deepmind/reverb
import reverb

from cpprb import (ReplayBuffer as RB,
                   PrioritizedReplayBuffer as PRB)


# Configulation
buffer_size = 2**12

obs_shape = 15
act_shape = 3

alpha = 0.4
beta  = 0.4

env_dict = {"obs": {"shape": obs_shape},
            "act": {"shape": act_shape},
            "next_obs": {"shape": obs_shape},
            "rew": {},
            "done": {}}


# Initialize Replay Buffer
rb  =  RB(buffer_size,env_dict)


# Initialize Prioritized Replay Buffer
prb  =  PRB(buffer_size,env_dict,alpha=alpha)


# Initalize Reverb Server
server = reverb.Server(tables =[
    reverb.Table(name='ReplayBuffer',
                 sampler=reverb.selectors.Uniform(),
                 remover=reverb.selectors.Fifo(),
                 max_size=buffer_size,
                 rate_limiter=reverb.rate_limiters.MinSize(1)),
    reverb.Table(name='PrioritizedReplayBuffer',
                 sampler=reverb.selectors.Prioritized(alpha),
                 remover=reverb.selectors.Fifo(),
                 max_size=buffer_size,
                 rate_limiter=reverb.rate_limiters.MinSize(1))
])

client = reverb.Client(f"localhost:{server.port}")
tf_client = reverb.TFClient(f"localhost:{server.port}")


# Helper Function
def env(n):
    e = {"obs": np.ones((n,obs_shape)),
         "act": np.zeros((n,act_shape)),
         "next_obs": np.ones((n,obs_shape)),
         "rew": np.zeros(n),
         "done": np.zeros(n)}
    return e

def add_client(_rb,table):
    """ Add for Reverb Client
    """
    def add(e):
        n = e["obs"].shape[0]
        with _rb.writer(max_sequence_length=1) as _w:
            for i in range(n):
                _w.append([e["obs"][i],
                           e["act"][i],
                           e["rew"][i],
                           e["next_obs"][i],
                           e["done"][i]])
                _w.create_item(table,1,1.0)
    return add

def add_client_insert(_rb,table):
    """ Add for Reverb Client
    """
    def add(e):
        n = e["obs"].shape[0]
        for i in range(n):
            _rb.insert([e["obs"][i],
                        e["act"][i],
                        e["rew"][i],
                        e["next_obs"][i],
                        e["done"][i]],priorities={table: 1.0})
    return add

def add_tf_client(_rb,table):
    """ Add for Reverb TFClient
    """
    def add(e):
        n = e["obs"].shape[0]
        for i in range(n):
            _rb.insert([tf.constant(e["obs"][i]),
                        tf.constant(e["act"][i]),
                        tf.constant(e["rew"][i]),
                        tf.constant(e["next_obs"][i]),
                        tf.constant(e["done"])],
                       tf.constant([table]),
                       tf.constant([1.0],dtype=tf.float64))
    return add

def sample_client(_rb,table):
    """ Sample from Reverb Client
    """
    def sample(n):
        return [i for i in _rb.sample(table,num_samples=n)]

    return sample

def sample_tf_client(_rb,table):
    """ Sample from Reverb TFClient
    """
    def sample(n):
        return [_rb.sample(table,
                           [tf.float64,tf.float64,tf.float64,tf.float64,tf.float64])
                for _ in range(n)]

    return sample

def sample_tf_client_dataset(_rb,table):
    """ Sample from Reverb TFClient using dataset
    """
    def sample(n):
        dataset=_rb.dataset(table,
                            [tf.float64,tf.float64,tf.float64,tf.float64,tf.float64],
                            [4,1,1,4,1])
        return itertools.islice(dataset,n)
    return sample


# ReplayBuffer.add
perfplot.plot(setup = env,
              time_unit="ms",
              kernels = [add_client_insert(client,"ReplayBuffer"),
                         add_client(client,"ReplayBuffer"),
                         add_tf_client(tf_client,"ReplayBuffer"),
                         lambda e: rb.add(**e)],
              labels = ["DeepMind/Reverb: Client.insert",
                        "DeepMind/Reverb: Client.writer",
                        "DeepMind/Reverb: TFClient.insert",
                        "cpprb"],
              n_range = [n for n in range(1,102,10)],
              xlabel = "Step size added at once",
              logx = False,
              logy = False,
              equality_check = None)
plt.title("Replay Buffer Add Speed")
plt.savefig("ReplayBuffer_add2.png",
            transparent=True,
            bbox_inches="tight")
plt.close()


# Fill Buffers
for _ in range(buffer_size):
    o = np.random.rand(obs_shape) # [0,1)
    a = np.random.rand(act_shape)
    r = np.random.rand(1)
    d = np.random.randint(2) # [0,2) == 0 or 1
    client.insert([o,a,r,o,d],priorities={"ReplayBuffer": 1.0})
    rb.add(obs=o,act=a,rew=r,next_obs=o,done=d)


# ReplayBuffer.sample
perfplot.plot(setup = lambda n: n,
              time_unit="ms",
              kernels = [sample_client(client,"ReplayBuffer"),
                         sample_tf_client(tf_client,"ReplayBuffer"),
                         sample_tf_client_dataset(tf_client,"ReplayBuffer"),
                         rb.sample],
              labels = ["DeepMind/Reverb: Client.sample",
                        "DeepMind/Reverb: TFClient.sample",
                        "DeepMind/Reverb: TFClient.dataset",
                        "cpprb"],
              n_range = [2**n for n in range(1,8)],
              xlabel = "Batch size",
              logx = False,
              logy = False,
              equality_check=None)
plt.title("Replay Buffer Sample Speed")
plt.savefig("ReplayBuffer_sample2.png",
            transparent=True,
            bbox_inches="tight")
plt.close()


# PrioritizedReplayBuffer.add
perfplot.plot(time_unit="ms",
              setup = env,
              kernels = [add_client_insert(client,"PrioritizedReplayBuffer"),
                         add_client(client,"PrioritizedReplayBuffer"),
                         add_tf_client(tf_client,"PrioritizedReplayBuffer"),
                         lambda e: prb.add(**e)],
              labels = ["DeepMind/Reverb: Client.insert",
                        "DeepMind/Reverb: Client.writer",
                        "DeepMind/Reverb: TFClient.insert",
                        "cpprb"],
              n_range = [n for n in range(1,102,10)],
              xlabel = "Step size added at once",
              logx = False,
              logy = False,
              equality_check=None)
plt.title("Prioritized Replay Buffer Add Speed")
plt.savefig("PrioritizedReplayBuffer_add2.png",
            transparent=True,
            bbox_inches="tight")
plt.close()


# Fill Buffers
for _ in range(buffer_size):
    o = np.random.rand(obs_shape) # [0,1)
    a = np.random.rand(act_shape)
    r = np.random.rand(1)
    d = np.random.randint(2) # [0,2) == 0 or 1
    p = np.random.rand(1)

    client.insert([o,a,r,o,d],priorities={"PrioritizedReplayBuffer": p})

    prb.add(obs=o,act=a,rew=r,next_obs=o,done=d,priority=p)


perfplot.plot(time_unit="ms",
              setup = lambda n: n,
              kernels = [sample_client(client,"PrioritizedReplayBuffer"),
                         sample_tf_client(tf_client,"PrioritizedReplayBuffer"),
                         sample_tf_client_dataset(tf_client,"PrioritizedReplayBuffer"),
                         lambda n: prb.sample(n,beta=beta)],
              labels = ["DeepMind/Reverb: Client.sample",
                        "DeepMind/Reverb: TFClient.sample",
                        "DeepMind/Reverb: TFClient.dataset",
                        "cpprb"],
              n_range = [2**n for n in range(1,9)],
              xlabel = "Batch size",
              logx=False,
              logy=False,
              equality_check=None)
plt.title("Prioritized Replay Buffer Sample Speed")
plt.savefig("PrioritizedReplayBuffer_sample2.png",
            transparent=True,
            bbox_inches="tight")
plt.close()

2.2 Results